302 - Pipeline Image Transformation with OpenCV

This example shows how to manipulate the collection of images. First, the images are downloaded to the local directory. Second, they are copied to your cluster's attached HDFS.


In [ ]:
IMAGE_PATH = "datasets/CIFAR10"

import os, subprocess
from urllib.request import urlretrieve
dataFile = "test.zip"
if not os.path.isdir(IMAGE_PATH):
    os.makedirs(IMAGE_PATH)
    urlretrieve("https://mmlspark.azureedge.net/datasets/CIFAR10/test.zip",
                IMAGE_PATH + ".zip")
    print(subprocess.check_output(
              "ip=\"%s\"; cd \"$ip\" && unzip -q \"../$(basename $PWD).zip\"" % IMAGE_PATH,
              stderr = subprocess.STDOUT, shell = True)
          .decode("utf-8"))

In [ ]:
%%local
IMAGE_PATH = "/datasets/CIFAR10/test"
import subprocess
if subprocess.call(["hdfs", "dfs", "-test", "-d", IMAGE_PATH]):
    from urllib import urlretrieve
    urlretrieve("https://mmlspark.azureedge.net/datasets/CIFAR10/test.zip", "/tmp/test.zip")
    print subprocess.check_output(
            "rm -rf /tmp/CIFAR10 && mkdir -p /tmp/CIFAR10 && unzip /tmp/test.zip -d /tmp/CIFAR10",
            stderr=subprocess.STDOUT, shell=True)
    print subprocess.check_output(
            "hdfs dfs -mkdir -p %s" % IMAGE_PATH,
            stderr=subprocess.STDOUT, shell=True)
    print subprocess.check_output(
            "hdfs dfs -copyFromLocal -f /tmp/CIFAR10/test/011*.png %s"%IMAGE_PATH,
            stderr=subprocess.STDOUT, shell=True)

In [ ]:
IMAGE_PATH = "/datasets/CIFAR10/test"

The images are loaded from the directory (for fast prototyping, consider loading a fraction of images). Inside the dataframe, each image is a single field in the image column. The image has sub-fields (path, height, width, OpenCV type and OpenCV bytes).


In [ ]:
import mmlspark
import numpy as np
from mmlspark import toNDArray

images = spark.readImages(IMAGE_PATH, recursive = True, sampleRatio = 0.1).cache()
images.printSchema()
print(images.count())

When collected from the DataFrame, the image data are stored in a Row, which is Spark's way to represent structures (in the current example, each dataframe row has a single Image, which itself is a Row). It is possible to address image fields by name and use toNDArray() helper function to convert the image into numpy array for further manipulations.


In [ ]:
from PIL import Image

data = images.take(3)    # take first three rows of the dataframe
im = data[2][0]          # the image is in the first column of a given row

print("image type: {}, number of fields: {}".format(type(im), len(im)))
print("image path: {}".format(im.path))
print("height: {}, width: {}, OpenCV type: {}".format(im.height, im.width, im.type))

arr = toNDArray(im)     # convert to numpy array
Image.fromarray(arr, "RGB")   # display the image inside notebook
print(images.count())

Use ImageTransformer for the basic image manipulation: resizing, cropping, etc. Internally, operations are pipelined and backed by OpenCV implementation.


In [ ]:
from mmlspark import ImageTransformer

tr = (ImageTransformer()                  # images are resized and then cropped
      .setOutputCol("transformed")
      .resize(height = 200, width = 200)
      .crop(0, 0, height = 180, width = 180) )

small = tr.transform(images).select("transformed")

im = small.take(3)[2][0]                  # take third image
Image.fromarray(toNDArray(im), "RGB")   # display the image inside notebook

For the advanced image manipulations, use Spark UDFs. The MMLSpark package provides conversion function between Spark Row and ndarray image representations.


In [ ]:
from pyspark.sql.functions import udf
from mmlspark import ImageSchema, toNDArray, toImage

def u(row):
    array = toNDArray(row)    # convert Image to numpy ndarray[height, width, 3]
    array[:,:,2] = 0
    return toImage(array)     # numpy array back to Spark Row structure

noBlueUDF = udf(u,ImageSchema)

noblue = small.withColumn("noblue", noBlueUDF(small["transformed"])).select("noblue")

im = noblue.take(3)[2][0]                # take second image
Image.fromarray(toNDArray(im), "RGB")   # display the image inside notebook

Images could be unrolled into the dense 1D vectors suitable for CNTK evaluation.


In [ ]:
from mmlspark import UnrollImage

unroller = UnrollImage().setInputCol("noblue").setOutputCol("unrolled")

unrolled = unroller.transform(noblue).select("unrolled")

vector = unrolled.take(1)[0][0]
print(type(vector))
len(vector.toArray())